package com.ycl.javacore.rpctest.remote;

import com.alibaba.fastjson.JSONObject;
import com.ycl.javacore.rpctest.config.ServiceConfig;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * User: OF1089 杨成龙
 * Date: 2019/8/23
 * Time: 4:58 PM
 * Desc: 接受的消息这里处理
 */
public class RpcInvokerHandler extends ChannelInboundHandlerAdapter {

    private Map<String, Method> interfaceMethods;

    private Map<Class, Object> interfaceToInstance;

    private ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 50, 60, TimeUnit.SECONDS,
            new LinkedBlockingDeque<>(100),
            new ThreadFactory() {
                AtomicInteger m = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "IO-thread-" + m.incrementAndGet());
                }
            });

    public RpcInvokerHandler(List<ServiceConfig> serviceConfigList, Map<String, Method> interfaceMethods) {
        this.interfaceToInstance = new ConcurrentHashMap<>();
        this.interfaceMethods = interfaceMethods;

        for (ServiceConfig config : serviceConfigList) {
            interfaceToInstance.put(config.getType(), config.getInstance());
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        super.channelRead(ctx, msg);
        String message = (String) msg;
        //这里拿到一串JSON数据，解析为Request对象
        System.out.println("接受到消息：" + msg);
        RpcRequest rpcRequest = RpcRequest.parse(message, ctx);
        threadPoolExecutor.execute(new RpcInvokeTask(rpcRequest));
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        super.channelReadComplete(ctx);
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
        System.out.println("发生了异常。。。" + cause);
        cause.printStackTrace();
        ctx.close();
    }

    public class RpcInvokeTask implements Runnable {
        private RpcRequest rpcRequest;

        RpcInvokeTask(RpcRequest rpcRequest) {
            this.rpcRequest = rpcRequest;
        }

        @Override
        public void run() {
            try {
                String interfaceIdentity = rpcRequest.getInterfaceIdentity();
                Method method = interfaceMethods.get(interfaceIdentity);
                Map<String, String> map = string2Map(interfaceIdentity);

                String interfaceName = map.get("interface");
                Class interfaceClass = Class.forName(interfaceName);
                Object o = interfaceToInstance.get(interfaceClass);

                String parameterString = map.get("parameter");

                Object result = null;
                if (parameterString != null) {
                    String[] parameterTypeClass = parameterString.split(",");
                    Map<String, Object> parameterMap = rpcRequest.getParameterMap();
                    Object[] parameterInstance = new Object[parameterTypeClass.length];
                    for (int i = 0; i < parameterTypeClass.length; i++) {
                        String parameterClazz = parameterTypeClass[i];
                        parameterInstance[i] = parameterMap.get(parameterClazz);
                    }
                    result = method.invoke(o, parameterInstance);
                } else {
                    result = method.invoke(o);
                }

                ChannelHandlerContext ctx = rpcRequest.getCtx();
                String requestId = rpcRequest.getRequestId();
                RpcResponse rpcResponse = RpcResponse.create(JSONObject.toJSONString(result), interfaceIdentity, requestId);
                String s = JSONObject.toJSONString(rpcResponse) + "$$";
                ByteBuf byteBuf = Unpooled.copiedBuffer(s.getBytes());
                ctx.writeAndFlush(byteBuf);

                System.out.println("响应给客户端：" + s);

            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static Map<String, String> string2Map(String str) {
        String[] split = str.split("&");
        Map<String, String> map = new HashMap<>();
        for (String s : split) {
            String[] split1 = s.split("=");
            map.put(split1[0], split1[1]);
        }
        return map;
    }
}
